home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
msn
/
P2P
/
P2PData.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
19KB
|
599 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
import logging
import random
import sys
import struct
import uuid
import util
from util import callsback, get, call_later
from util.Events import EventMixin
try:
from CStringIO import StringIO
except ImportError:
from StringIO import StringIO
log = logging.getLogger('msn.p2p.data')
flagged = lambda v, f: f & v == f
randid = lambda : random.randint(4, sys.maxint // 2 - 5)
class Flags:
names = {
0: 'none',
1: 'sync',
2: 'ack',
4: 'wait',
8: 'err',
32: 'data',
64: 'byea',
128: 'byem',
16777264: 'file',
256: 'dchs' }
NONE = 0
ONE = 1
UNKNOWN = ONE
SYNC = ONE
ACK = 2
WAIT = 4
ERROR = 8
DATA = 32
BYEACK = 64
BYEMSG = 128
FILE = 16777264
HANDSHAKE = 256
DCHS = HANDSHAKE
Header = util.new_packable(('session', 'I', 'msgid', 'I', 'offset', 'Q', 'total', 'Q', 'length', 'I', 'flags', 'I', 'msgid_ack', 'I', 'msgid_ackack', 'I', 'total_ack', 'Q'), byteorder = '<')
class P2PMessage(object):
def __init__(self, sender, recipient, id, flags, session_id, app_id, size, content, acked_msg_id = None, prev_acked_msg_id = 0, acked_data_size = 0):
self.sender = get(sender, 'name', sender)
self.recipient = get(recipient, 'name', recipient)
self.content = content
if acked_msg_id is None:
acked_msg_id = randid()
self.header = Header(session = session_id, msgid = id, offset = 0, total = size, length = 0, flags = flags, msgid_ack = acked_msg_id, msgid_ackack = prev_acked_msg_id, total_ack = acked_data_size)
self.app_id = self.footer = app_id
self.transferred = 0
def reset(self):
if self.content is not None:
try:
self.content.seek(0)
except ValueError:
pass
except:
None<EXCEPTION MATCH>ValueError
None<EXCEPTION MATCH>ValueError
def write(self, data):
self.content.write(data)
self.transferred = self.content.tell()
def seek(self, position):
self.content.seek(position)
def read(self, max_size):
if self.content is not None:
if self.content.closed:
log.error('Read called on a message with closed content! wtf.')
return None
data = self.content.read(max_size)
self.transferred = self.content.tell()
else:
data = ''
return data
def tell(self):
return self.transferred
def size(self):
return self.header.total
size = property(size)
def complete(self):
return self.transferred == self.size
complete = property(complete)
def __hash__(self):
return hash((self.sender, self.recipient, self.header.pack()))
def __repr__(self):
content = self.content
if isinstance(content, StringIO):
content = content.getvalue()
contentstr = ''
if content:
try:
contentstr = 'content=%r' % content[:30]
except Exception:
pass
except:
None<EXCEPTION MATCH>Exception
None<EXCEPTION MATCH>Exception
return '<%s session=%d msgid=%d size=%d offset=%d total=%d flags=%d type(content)=%r %s>' % (type(self).__name__, self.header.session, self.header.msgid, self.header.length, self.tell(), self.size, self.header.flags, type(content).__name__, contentstr)
class P2PTransport(EventMixin):
events = EventMixin.events | set(('contacts_changed', 'recv_data', 'send_data'))
def __init__(self, client):
EventMixin.__init__(self)
client._p2p_manager._register_transport(self)
self.p2p_clients = 0
def p2p_peers(self):
raise NotImplementedError
p2p_peers = property(p2p_peers)
def p2p_rating(self):
raise NotImplementedError
p2p_rating = property(p2p_rating)
def p2p_max_msg_size(self):
raise NotImplementedError
p2p_max_msg_size = property(p2p_max_msg_size)
def p2p_send(self, data, callback = None):
raise NotImplementedError
p2p_send = callsback(p2p_send)
def p2p_overhead(self):
raise NotImplementedError
p2p_overhead = property(p2p_overhead)
def build_data(self, header, body, footer):
raise NotImplementedError
def __repr__(self):
return '<%s p2pclients=%r id=%d>' % (type(self).__name__, get(self, 'p2p_clients', None), id(self))
class P2PManager(EventMixin):
events = EventMixin.events | set(('recv_msg_start', 'recv_msg_end', 'recv_error', 'send_msg_start', 'send_msg_end', 'send_error', 'recv_data', 'send_data'))
def __init__(self, client):
EventMixin.__init__(self)
self.client = client
self._transports = []
self._P2PManager__incoming = { }
self._P2PManager__outgoing = { }
self._P2PManager__sent = { }
self._P2PManager__last_acked = None
self.sort_transports()
def close_all(self):
for transport in self._transports:
try:
transport.Disconnect()
continue
import traceback as traceback
traceback.print_exc()
continue
def _outgoing(self):
return self._P2PManager__outgoing
_outgoing = property(_outgoing)
def sort_transports(self):
self._best = { }
for transport in self._transports:
rating = transport.p2p_rating
for peer in transport.p2p_peers:
if peer in self._best:
prev_rating = self._best[peer].p2p_rating
if rating > prev_rating:
self._best[peer] = transport
rating > prev_rating
self._best[peer] = transport
def _register_transport(self, transport):
try:
v = transport._P2PManager__registered
except AttributeError:
v = False
if v:
log.info('Transport was already registered. Returning from register.')
return None
bind = transport.bind
bind('contacts_changed', self.transport_sorter)
bind('recv_data', self._on_recv_data)
bind('send_data', self._on_send_data)
self._transports.append(transport)
transport._P2PManager__registered = True
self.sort_transports()
def _unregister_transport(self, transport):
try:
v = transport._P2PManager__registered
except AttributeError:
v = False
if not v:
log.info('Transport was not registered. Returning from unregister.')
return None
log.debug('P2PManager removing transport %r', transport)
unbind = transport.unbind
unbind('contacts_changed', self.transport_sorter)
unbind('recv_data', self._on_recv_data)
unbind('send_data', self._on_send_data)
transport._P2PManager__registered = False
self._transports.remove(transport)
self.sort_transports()
def transport_sorter(self, *a):
self.sort_transports()
def get_best(self, peer, callback = None):
try:
return self._best[peer]
except KeyError:
log.info('No transport found for %s, returning default (bests: %r)', peer, self._best)
return self.client._get_default_p2p_transport(peer, callback = callback)
get_best = callsback(get_best)
def _on_send_data(self):
self.event('send_data')
def _on_recv_data(self, transport, sender, data, has_footer = True):
try:
header = Header.unpack(data[:48])
data = data[48:]
except Exception:
e = None
print repr(data)
raise
log.debug('P2PManager got message(flags=%d) from %r via %r', header.flags, sender, transport)
if has_footer:
footer = struct.unpack('>L', data[-4:])[0]
data = data[:-4]
else:
footer = 0
if footer == 0 and data == '\x00\x00\x00\x00':
footer = 1
try:
pass
except:
import traceback
traceback.print_exc()
raise
if header.total == header.offset + header.length:
log.critical('Got completed P2PMessage with flags %d (%s)', header.flags, get(Flags.names, header.flags, 'Super duper unknown flags %d' % (header.flags,)))
if flagged(header.flags, Flags.ACK):
self._P2PManager__last_acked = header.msgid_ack
return self._process_ack(header.msgid_ack)
elif flagged(header.flags, Flags.ERROR):
log.info('Got binary transport error')
if header.msgid_ack in self._P2PManager__outgoing:
self._P2PManager__outgoing[header.msgid_ack].on_done()
self.event('recv_error')
return None
elif flagged(header.flags, Flags.SYNC):
try:
sent = self._P2PManager__outgoing[header.msgid_ack]
except Exception:
e = None
log.error('P2P sync received: %s', list(header))
log.error('Error acks total %d, my message says offset %d', header.total_ack, sent.msg.header.offset)
sent.msg.seek(header.total_ack - sent.msg.header.length)
return None
elif flagged(header.flags, Flags.HANDSHAKE):
log.warning('Got Direct Connect Handshake Message (DCHS) header=<%r>', list(header))
their_nonce = uuid.UUID(bytes_le = header.pack()[-16:])
msn_hash = msn_hash
import SLPCalls
None(None, log.warning, 'Their %shashed Nonce (unhashed: %s, hashed: %s)' if transport.in_key is None else '', their_nonce if not transport.in_key else their_nonce)
log.warning('My unhashed Nonce (unhashed: %s, hashed: %s)', transport.out_key, transport.out_hkey)
if transport.in_key is None:
transport.in_key = their_nonce
elif transport.in_hkey is None:
transport.in_hkey = their_nonce
else:
log.info('Had all the nonces already. (How did that happen?)')
transport._send_nonce(header.msgid, header.msgid_ack)
transport.event('on_ready')
return None
elif flagged(header.flags, Flags.BYEACK):
log.info('Got ack for BYE message. Going to send waiting flags')
elif flagged(header.flags, Flags.WAIT):
log.critical("Got waiting message. Here's the header: %r", list(header))
msg = P2PMessage(None, sender, header.msgid, 6, header.session, footer, 0, None)
self.send_message(msg)
return None
id = header.msgid
if id not in self._P2PManager__incoming:
log.info('Got new message')
msg = self._process_new(sender, header, footer)
transport.p2p_clients += 1
else:
log.debug('Continuing message with id=(%r)', id)
msg = self._P2PManager__incoming[id]
try:
if header.offset != msg.tell():
log.warning('Header offset does not match file offset')
msg.seek(header.offset)
msg.write(data)
except:
self.event('recv_error')
return None
if msg.complete:
log.info('Received message')
transport.p2p_clients -= 1
self._process_msg(header, footer, msg)
self.event('recv_data')
def _process_ack(self, id):
if id in self._P2PManager__sent:
msg = self._P2PManager__sent.pop(id)
log.info('Got ack for message, NOT resetting it: %r', msg)
self.event('send_msg_end', msg)
else:
log.error('got ack for unknown message')
def _process_new(self, sender, header, footer):
content = self.client.slp_call_master._create_message_content(header, footer)
msg = P2PMessage(sender, None, header.msgid, header.flags, header.session, footer, header.total, content)
self._P2PManager__incoming[header.msgid] = msg
self.event('recv_msg_start', msg)
return msg
def _process_msg(self, header, footer, msg):
log.info('P2P message complete (%s)', msg)
self.send_ack(header, footer, msg)
msg.reset()
try:
self.event('recv_msg_end', self, msg)
except Exception:
e = None
import traceback
traceback.print_stack()
traceback.print_exc()
del self._P2PManager__incoming[header.msgid]
def send_ack(self, header, footer, msg):
try:
flags = None if flagged(header.flags, Flags.BYEMSG) else Flags.ACK
log.info('Going to send ack with flags %s', flags)
try:
id = self._P2PManager__last_acked + 1
except:
id = randid()
finally:
self._P2PManager__last_acked = None
sender = get(msg, 'sender')
ack_msg = P2PMessage(None, sender, id, flags, header.session, footer, header.total, None, header.msgid, header.msgid_ack, header.total)
self.send_message(ack_msg)
except Exception:
e = None
import traceback
traceback.print_exc()
raise e
def send_with_producer(self, msg, callback = None):
if msg.header.msgid not in self._P2PManager__outgoing:
log.warning('Sending producer for msg with id %d', msg.header.msgid)
self.event('send_msg_start', msg)
prod = self.make_producer(msg, callback = callback)
self._P2PManager__outgoing[msg.header.msgid] = prod
self._send_producer(prod)
else:
log.warning('Got producer for %d again (???). Not sending it.', msg.header.msgid)
return None
send_with_producer = callsback(send_with_producer)
send_message = send_with_producer
def _send_producer(self, prod):
try:
if not prod.transport._P2PManager__registered:
self._register_transport(prod.transport)
except Exception:
e = None
import traceback
traceback.print_exc()
raise e
try:
prod.push()
except TypeError:
e = None
log.info("Can't send this producer (%r) on its transport. Need to find a new transport.", prod)
import traceback
traceback.print_exc()
def make_producer(self, msg, callback = None):
log.info('P2PData making a producer for %r', msg)
prod = P2PProducer(self, msg, callback.success, callback.error)
return prod
make_producer = callsback(make_producer)
def _unqueue(self, msg):
del self._P2PManager__outgoing[msg.header.msgid]
if not flagged(msg.header.flags, Flags.ACK):
self._P2PManager__sent[msg.header.msgid] = msg
class P2PProducer(object):
def __init__(self, master, msg, whendone = None, on_error = None):
self.master = master
self.msg = msg
self._transport = self.master.get_best(self.msg.recipient)
self._transport.p2p_clients += 1
self._P2PProducer__finished = False
def donothing():
pass
self._oncomplete = self._transport if whendone is not None else donothing
self._onerror = None if on_error is not None else donothing
self.master.bind('send_msg_end', self.on_ack)
def transport(self):
if self._P2PProducer__finished:
raise AttributeError
self._transport.p2p_clients -= 1
old_xport = self._transport
self._transport = self.master.get_best(self.msg.recipient, error = self._onerror)
self._transport.p2p_clients += 1
return self._transport
transport = property(transport)
def recipient(self):
return self.msg.recipient
recipient = property(recipient)
def more(self):
if self._P2PProducer__finished:
log.info('Message %r complete or cancelled, returning None', self.msg)
return None
msg = self.msg
transport = self.transport
if hasattr(transport, '_super_secret_msgid'):
msg.header.msgid_ack = transport._super_secret_msgid
(header, body, footer) = _next_msg(msg, transport.p2p_max_msg_size - transport.p2p_overhead)
if body == body and footer == footer:
pass
elif footer == None:
if not msg.complete:
log.error('Message %r is not complete but had no more data', msg)
else:
log.warning('Message %r complete. Returning None')
self.on_done()
return None
data = transport.build_data(header, body, footer)
transport.event('send_data')
return data
def on_ack(self, msg):
if msg.header.msgid == self.msg.header.msgid:
log.info('Finished producer calling %r', self._oncomplete)
self._oncomplete()
self.master.unbind('send_msg_end', self.on_ack)
def on_done(self):
log.info('Producer finished')
self._P2PProducer__finished = True
self.master._unqueue(self.msg)
self._transport.p2p_clients -= 1
del self._transport
def push(self):
transport = self.transport
transport.push_with_producer(self, error = self._onerror)
def __repr__(self):
return None % ('<%s message=%r, %s>', type(self).__name__, self.msg if self._P2PProducer__finished else 'transport=%r' % self._transport)
def _next_msg(msg, size):
header = msg.header
header.offset = msg.transferred
body = msg.read(size)
if body is None:
return (None, None, None)
blen = len(body)
header.length = blen
return (header.pack(), body, struct.pack('>L', msg.footer))